package com.tpvlog.dfs.backupnode.log;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.tpvlog.dfs.backupnode.BackupNode;
import com.tpvlog.dfs.backupnode.file.FSNameSystem;
import com.tpvlog.dfs.backupnode.server.NameNodeRpcClient;

/**
 * edits log复制组件
 */
public class EditsLogFetcher extends Thread {
    private static final Integer BACKUP_NODE_FETCH_SIZE = 10;

    private BackupNode backupNode;
    private NameNodeRpcClient rpcClient;
    private FSNameSystem namesystem;

    public EditsLogFetcher(BackupNode backupNode, FSNameSystem namesystem, NameNodeRpcClient rpcClient) {
        this.backupNode = backupNode;
        this.rpcClient = rpcClient;
        this.namesystem = namesystem;
    }

    @Override
    public void run() {
        System.out.println("edits log同步线程已经启动......");

        while (backupNode.isRunning()) {
            try {
                // 1.如果BackipNode正在进行元数据恢复，则等待其完成
                if (!namesystem.isFinishedRecover()) {
                    System.out.println("当前还没完成元数据恢复，不进行editlog同步......");
                    Thread.sleep(1000);
                    continue;
                }

                // 2.从上一次同步完成的txid开始进行日志拉取
                long syncedTxid = namesystem.getSyncedTxid();
                JSONArray editsLogs = rpcClient.fetchEditsLog(syncedTxid);

                if (editsLogs.size() == 0) {
                    System.out.println("没有拉取到任何一条editslog，等待1秒后继续尝试拉取");
                    Thread.sleep(1000);
                    continue;
                }

                if (editsLogs.size() < BACKUP_NODE_FETCH_SIZE) {
                    Thread.sleep(1000);
                    System.out.println("拉取到的edits log不足10条数据，等待1秒后再次继续去拉取");
                }

                // 3.进行日志回放
                for (int i = 0; i < editsLogs.size(); i++) {
                    JSONObject editsLog = editsLogs.getJSONObject(i);
                    System.out.println("拉取到一条editslog：" + editsLog.toJSONString());
                    String op = editsLog.getString("OP");

                    if (op.equals("MKDIR")) {
                        String path = editsLog.getString("PATH");
                        try {
                            namesystem.mkdir(editsLog.getLongValue("txid"), path);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }else if(op.equals("CREATE")) {
                        String filename = editsLog.getString("PATH");
                        try {
                            namesystem.ceateFile(editsLog.getLongValue("txid"), filename);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
                rpcClient.setRunning(true);
            } catch (Exception e) {
                rpcClient.setRunning(false);
            }
        }
    }
}
